In [1]:
#pip install dask
#pip install dask-ml
import plotly.io as pio
#pio.renderers.default='notebook'
In [2]:
import dask
import dask.dataframe as dd
from time import time
import plotly.express as px
import pandas as pd
import statsmodels.api as sm
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error
from time import time
import multiprocessing as mp
import dask.dataframe as dd
import statsmodels.api as sm
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error
from time import time
from dask_ml.linear_model import LinearRegression
from scipy.stats.mstats import winsorize
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')
C:\Users\K\AppData\Local\Temp\ipykernel_4376\1015654004.py:2: DeprecationWarning: The current Dask DataFrame implementation is deprecated.
In a future release, Dask DataFrame will use a new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.
The new implementation is already available and can be enabled by
installing the dask-expr library:
$ pip install dask-expr
and turning the query planning option on:
>>> import dask
>>> dask.config.set({'dataframe.query-planning': True})
>>> import dask.dataframe as dd
API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html
Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues
To disable this warning in the future, set dask config:
# via Python
>>> dask.config.set({'dataframe.query-planning-warning': False})
# via CLI
dask config set dataframe.query-planning-warning False
import dask.dataframe as dd
Data Processing¶
In [3]:
%%time
## Load full trip data
data_full = dd.read_csv("data/Trips_Full Data.csv", assume_missing=True)
#load trip data by distance
data_dist = dd.read_csv("data/Trips_by_Distance.csv", assume_missing=True, dtype={'County Name': 'object', 'State Postal Code': 'object'})
CPU times: total: 78.1 ms Wall time: 141 ms
In [4]:
# Check for missing values
missing_full = data_full.isnull().sum().compute()
missing_dist = data_dist.isnull().sum().compute()
In [5]:
#number missing values per variable
print("Missing values in full trip data:")
print(missing_full)
print("\nMissing values in trip data by distance:")
print(missing_dist)
Missing values in full trip data: Month of Date 0 Week of Date 0 Year of Date 0 Level 0 Date 0 Week Ending Date 0 Trips <1 Mile 0 People Not Staying at Home 0 Population Staying at Home 0 Trips 0 Trips 1-25 Miles 0 Trips 1-3 Miles 0 Trips 10-25 Miles 0 Trips 100-250 Miles 0 Trips 100+ Miles 0 Trips 25-100 Miles 0 Trips 25-50 Miles 0 Trips 250-500 Miles 0 Trips 3-5 Miles 0 Trips 5-10 Miles 0 Trips 50-100 Miles 0 Trips 500+ Miles 0 dtype: int64 Missing values in trip data by distance: Level 0 Date 0 State FIPS 901 State Postal Code 901 County FIPS 46852 County Name 46852 Population Staying at Home 12950 Population Not Staying at Home 12950 Number of Trips 12950 Number of Trips <1 12950 Number of Trips 1-3 12950 Number of Trips 3-5 12950 Number of Trips 5-10 12950 Number of Trips 10-25 12950 Number of Trips 25-50 12950 Number of Trips 50-100 12950 Number of Trips 100-250 12950 Number of Trips 250-500 12950 Number of Trips >=500 12950 Row ID 0 Week 0 Month 0 dtype: int64
In [6]:
%%time
# Drop columns with more than 50% missing entries
threshold = 0.5
columns_to_drop_full = missing_full[missing_full > threshold * len(data_full)].index
columns_to_drop_dist = missing_dist[missing_dist > threshold * len(data_dist)].index
data_full = data_full.drop(columns=columns_to_drop_full)
data_dist = data_dist.drop(columns=columns_to_drop_dist)
CPU times: total: 4.06 s Wall time: 8.75 s
In [7]:
# Drop rows with missing data
data_full = data_full.dropna().reset_index()
data_dist = data_dist.dropna().reset_index()
In [8]:
%%time
# Display info after dropping columns and rows
print("\nInfo after dropping columns and rows:")
print("Full trip data:")
print(data_full.compute().info())
print("\nTrip data by distance:")
print(data_dist.compute().info())
Info after dropping columns and rows: Full trip data: <class 'pandas.core.frame.DataFrame'> RangeIndex: 7 entries, 0 to 6 Data columns (total 23 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 index 7 non-null int64 1 Month of Date 7 non-null string 2 Week of Date 7 non-null string 3 Year of Date 7 non-null float64 4 Level 7 non-null string 5 Date 7 non-null string 6 Week Ending Date 7 non-null string 7 Trips <1 Mile 7 non-null float64 8 People Not Staying at Home 7 non-null float64 9 Population Staying at Home 7 non-null float64 10 Trips 7 non-null float64 11 Trips 1-25 Miles 7 non-null float64 12 Trips 1-3 Miles 7 non-null float64 13 Trips 10-25 Miles 7 non-null float64 14 Trips 100-250 Miles 7 non-null float64 15 Trips 100+ Miles 7 non-null float64 16 Trips 25-100 Miles 7 non-null float64 17 Trips 25-50 Miles 7 non-null float64 18 Trips 250-500 Miles 7 non-null float64 19 Trips 3-5 Miles 7 non-null float64 20 Trips 5-10 Miles 7 non-null float64 21 Trips 50-100 Miles 7 non-null float64 22 Trips 500+ Miles 7 non-null float64 dtypes: float64(17), int64(1), string(5) memory usage: 1.7 KB None Trip data by distance: <class 'pandas.core.frame.DataFrame'> Index: 988773 entries, 0 to 515015 Data columns (total 23 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 index 988773 non-null int64 1 Level 988773 non-null string 2 Date 988773 non-null string 3 State FIPS 988773 non-null float64 4 State Postal Code 988773 non-null string 5 County FIPS 988773 non-null float64 6 County Name 988773 non-null string 7 Population Staying at Home 988773 non-null float64 8 Population Not Staying at Home 988773 non-null float64 9 Number of Trips 988773 non-null float64 10 Number of Trips <1 988773 non-null float64 11 Number of Trips 1-3 988773 non-null float64 12 Number of Trips 3-5 988773 non-null float64 13 Number of Trips 5-10 988773 non-null float64 14 Number of Trips 10-25 988773 non-null float64 15 Number of Trips 25-50 988773 non-null float64 16 Number of Trips 50-100 988773 non-null float64 17 Number of Trips 100-250 988773 non-null float64 18 Number of Trips 250-500 988773 non-null float64 19 Number of Trips >=500 988773 non-null float64 20 Row ID 988773 non-null string 21 Week 988773 non-null float64 22 Month 988773 non-null float64 dtypes: float64(17), int64(1), string(5) memory usage: 226.7 MB None CPU times: total: 5.55 s Wall time: 10 s
Check if there are any outliers¶
In [9]:
fig = px.box(data_dist.compute(), y='Population Staying at Home', title='Box Plot of Trips')
# Update layout for better visualization
fig.update_layout(yaxis_title='Trips', boxmode='group')
# display figure
fig.update_layout(height=600, width=1200)
# Display the figure
fig.show()
There are some outliers in the data. To handle outleirs, we will apply winsorization (https://www.geeksforgeeks.org/winsorization/)
1A - Data Aggregation (People staying at home)¶
In [10]:
def process_data_dask(file_path, num_workers):
# Set the Dask scheduler and number of workers
dask.config.set(scheduler='processes', num_workers=num_workers)
# Read CSV file into a Dask DataFrame
start = time()
air = dd.read_csv(file_path, assume_missing=True, dtype={'County Name': 'object', 'State Postal Code': 'object'})
#process the data
# Drop columns with more than 50% missing entries
missing = air.isnull().sum().compute()
threshold = 0.5
columns_to_drop = missing[missing > threshold * len(air)].index
air = air.drop(columns=columns_to_drop)
air = air.dropna().reset_index().compute()
# Winsorize outliers for numeric columns
numeric_columns = air.select_dtypes(include='number').columns
#air[numeric_columns] = air[numeric_columns].map(winsorize, axis=0)
# Drop rows with missing values after winsorization
air = air.dropna().reset_index(drop=True)
read_time = round(time() - start, 2)
# Perform groupby and compute mean
start = time()
by_week = air.groupby('Week')['Population Staying at Home'].mean()
result = round(by_week, 2)
groupby_time = round(time() - start, 2)
return result, read_time, groupby_time
def process_data_sequential(file_path):
dask.config.set(scheduler='synchronous')
# Read CSV file into a Dask DataFrame
start = time()
air = dd.read_csv(file_path, assume_missing=True, dtype={'County Name': 'object', 'State Postal Code': 'object'})
#process the data
# Drop columns with more than 50% missing entries
missing = air.isnull().sum().compute()
threshold = 0.5
columns_to_drop = missing[missing > threshold * len(air)].index
air = air.drop(columns=columns_to_drop)
air = air.dropna().reset_index().compute()
# Winsorize outliers for numeric columns
numeric_columns = air.select_dtypes(include='number').columns
#air[numeric_columns] = air[numeric_columns].map(winsorize, axis=0)
# Drop rows with missing values after winsorization
air = air.dropna().reset_index(drop=True)
read_time = round(time() - start, 2)
# Perform groupby and compute mean
start = time()
by_week = air.groupby('Week')['Population Staying at Home'].mean()
result = round(by_week, 2)
groupby_time = round(time() - start, 2)
return result, read_time, groupby_time
def main_dask():
file_path = "data/Trips_by_Distance.csv"
# Process data sequentially using Dask
result_seq, read_time_seq, groupby_time_seq = process_data_sequential(file_path)
total_time_seq_1a = read_time_seq + groupby_time_seq
print("Results with sequential processing:")
display(result_seq)
print("Total Time: {} seconds".format(total_time_seq_1a))
print("Read Time: {} seconds".format(read_time_seq))
print("Groupby Time: {} seconds".format(groupby_time_seq))
print("Results with 10 processors:")
# Process data using Dask with 10 processors
result_10, read_time_10, groupby_time_10 = process_data_dask(file_path, num_workers=10)
total_time_dask_10_1a = read_time_10 + groupby_time_10
print("\nResults with 10 processors:")
display(result_10)
print("Total Time: {} seconds".format(total_time_dask_10_1a))
print("Read Time: {} seconds".format(read_time_10))
print("Groupby Time: {} seconds".format(groupby_time_10))
# Process data using Dask with 20 processors
result_20, read_time_20, groupby_time_20 = process_data_dask(file_path, num_workers=20)
total_time_dask_20_1a = read_time_20 + groupby_time_20
print("\nResults with 20 processors:")
display(result_20)
print("Total Time: {} seconds".format(total_time_dask_20_1a))
print("Read Time: {} seconds".format(read_time_20))
print("Groupby Time: {} seconds".format(groupby_time_20))
#return one aggregation result for visualization
return result_20, total_time_dask_10_1a, total_time_dask_20_1a, total_time_seq_1a
if __name__ == '__main__':
results, total_time_dask_10_1a, total_time_dask_20_1a, total_time_seq_1a = main_dask()
Results with sequential processing:
Week 0.0 20976.79 1.0 20071.31 2.0 19897.34 3.0 19889.44 4.0 20096.31 5.0 19757.43 6.0 19599.07 7.0 20033.60 8.0 19741.67 9.0 20043.10 10.0 19826.90 11.0 20264.39 12.0 20632.23 13.0 20792.38 14.0 21064.02 15.0 21375.30 16.0 20066.29 17.0 19666.74 18.0 20020.24 19.0 19679.55 20.0 19881.39 21.0 20060.82 22.0 19897.75 23.0 20503.63 24.0 20571.77 25.0 20088.39 26.0 20823.99 27.0 20249.93 28.0 20038.48 29.0 20758.97 30.0 20662.34 31.0 20285.02 32.0 19052.69 33.0 18745.05 34.0 18890.03 35.0 20627.34 36.0 19918.28 37.0 20077.21 38.0 20648.27 39.0 21323.44 40.0 21132.91 41.0 21656.94 42.0 21958.42 43.0 21998.57 44.0 7465.21 Name: Population Staying at Home, dtype: float64
Total Time: 39.29 seconds Read Time: 39.21 seconds Groupby Time: 0.08 seconds Results with 10 processors: Results with 10 processors:
Week 0.0 20976.79 1.0 20071.31 2.0 19897.34 3.0 19889.44 4.0 20096.31 5.0 19757.43 6.0 19599.07 7.0 20033.60 8.0 19741.67 9.0 20043.10 10.0 19826.90 11.0 20264.39 12.0 20632.23 13.0 20792.38 14.0 21064.02 15.0 21375.30 16.0 20066.29 17.0 19666.74 18.0 20020.24 19.0 19679.55 20.0 19881.39 21.0 20060.82 22.0 19897.75 23.0 20503.63 24.0 20571.77 25.0 20088.39 26.0 20823.99 27.0 20249.93 28.0 20038.48 29.0 20758.97 30.0 20662.34 31.0 20285.02 32.0 19052.69 33.0 18745.05 34.0 18890.03 35.0 20627.34 36.0 19918.28 37.0 20077.21 38.0 20648.27 39.0 21323.44 40.0 21132.91 41.0 21656.94 42.0 21958.42 43.0 21998.57 44.0 7465.21 Name: Population Staying at Home, dtype: float64
Total Time: 52.900000000000006 seconds Read Time: 52.84 seconds Groupby Time: 0.06 seconds Results with 20 processors:
Week 0.0 20976.79 1.0 20071.31 2.0 19897.34 3.0 19889.44 4.0 20096.31 5.0 19757.43 6.0 19599.07 7.0 20033.60 8.0 19741.67 9.0 20043.10 10.0 19826.90 11.0 20264.39 12.0 20632.23 13.0 20792.38 14.0 21064.02 15.0 21375.30 16.0 20066.29 17.0 19666.74 18.0 20020.24 19.0 19679.55 20.0 19881.39 21.0 20060.82 22.0 19897.75 23.0 20503.63 24.0 20571.77 25.0 20088.39 26.0 20823.99 27.0 20249.93 28.0 20038.48 29.0 20758.97 30.0 20662.34 31.0 20285.02 32.0 19052.69 33.0 18745.05 34.0 18890.03 35.0 20627.34 36.0 19918.28 37.0 20077.21 38.0 20648.27 39.0 21323.44 40.0 21132.91 41.0 21656.94 42.0 21958.42 43.0 21998.57 44.0 7465.21 Name: Population Staying at Home, dtype: float64
Total Time: 51.82 seconds Read Time: 51.77 seconds Groupby Time: 0.05 seconds
In [11]:
fig = px.line(pd.DataFrame(results).reset_index(), x='Week', y='Population Staying at Home', title='Average Number of People by Week')
#layout for better visualization
fig.update_layout(xaxis_title='Week', yaxis_title='Average Number of People Staying at Home by Week')
# figure
fig.update_layout(height=600, width=1200)
#line color
fig.update_traces(line=dict(color='steelblue'))
fig.show()
1B - Data Aggregation (How far people travel)¶
In [12]:
# Function to read the data
def read_data(file_path):
start = time()
air = dd.read_csv(file_path, assume_missing=True , dtype={'County Name': 'object', 'State Postal Code': 'object'})
#process the data
# Drop columns with more than 50% missing entries
missing = air.isnull().sum().compute()
threshold = 0.5
columns_to_drop = missing[missing > threshold * len(air)].index
air = air.drop(columns=columns_to_drop)
air = air.dropna().reset_index()
# Winsorize outliers for numeric columns
numeric_columns = air.select_dtypes(include='number').columns
# Drop rows with missing values after winsorization
air = air.dropna().reset_index(drop=True)
read_time = round(time() - start, 2)
return air, read_time
def group_week_dask(data, num_workers):
dask.config.set(scheduler='processes', num_workers=num_workers)
start = time()
# Extracting week numbers from 'Week Ending Date' column
data = data.assign(Week = lambda x: data['Week of Date'].str.extract('(\d+)'))
# Group by 'Week' and calculate the mean for selected columns
averages = data.groupby("Week")[['Trips 1-25 Miles',
'Trips 1-3 Miles', 'Trips 10-25 Miles', 'Trips 100-250 Miles',
'Trips 100+ Miles', 'Trips 25-100 Miles', 'Trips 25-50 Miles',
'Trips 250-500 Miles', 'Trips 3-5 Miles', 'Trips 5-10 Miles',
'Trips 50-100 Miles', 'Trips 500+ Miles']].mean()
result = averages.compute()
groupby_time = round(time() - start, 2)
return result, groupby_time
def group_week_dask_sequential(data):
dask.config.set(scheduler='synchronous')
start = time()
# Extracting week numbers from 'Week Ending Date' column
data = data.assign(Week = lambda x: data['Week of Date'].str.extract('(\d+)'))
# Group by 'Week' and calculate the mean for selected columns
averages = data.groupby("Week")[['Trips 1-25 Miles',
'Trips 1-3 Miles', 'Trips 10-25 Miles', 'Trips 100-250 Miles',
'Trips 100+ Miles', 'Trips 25-100 Miles', 'Trips 25-50 Miles',
'Trips 250-500 Miles', 'Trips 3-5 Miles', 'Trips 5-10 Miles',
'Trips 50-100 Miles', 'Trips 500+ Miles']].mean()
result = averages.compute()
groupby_time = round(time() - start, 2)
return result, groupby_time
def compare_performance(file_path):
# Read data sequentially
air_seq, read_time_seq = read_data(file_path)
result_seq, groupby_time_seq = group_week_dask_sequential(air_seq)
total_time_seq_1b = read_time_seq + groupby_time_seq
print("Results with sequential processing:")
display(round(result_seq))
print("Total Time: {} seconds".format(total_time_seq_1b))
print("Read Time: {} seconds".format(read_time_seq))
print("Groupby Time: {} seconds".format(groupby_time_seq))
# Read data using Dask with 10 processors
air_dask_10, read_time_dask_10 = read_data(file_path)
result_dask_10, groupby_time_dask_10 = group_week_dask(air_dask_10, num_workers=10)
total_time_dask_10 = read_time_dask_10 + groupby_time_dask_10
print("\nResults with Dask processing (10 processors):")
display(round(result_dask_10))
print("Total Time: {} seconds".format(total_time_dask_10))
print("Read Time: {} seconds".format(read_time_dask_10))
print("Groupby Time: {} seconds".format(groupby_time_dask_10))
# Read data using Dask with 20 processors
air_dask_20, read_time_dask_20 = read_data(file_path)
result_dask_20, groupby_time_dask_20 = group_week_dask(air_dask_20, num_workers=20)
total_time_dask_20 = read_time_dask_20 + groupby_time_dask_20
print("\nResults with Dask processing (20 processors):")
display(round(result_dask_20))
print("Total Time: {} seconds".format(total_time_dask_20))
print("Read Time: {} seconds".format(read_time_dask_20))
print("Groupby Time: {} seconds".format(groupby_time_dask_20))
#return dataframe for visualization
return result_dask_20, total_time_dask_10, total_time_dask_20, total_time_seq_1b
if __name__ == '__main__':
file_path = "data/Trips_Full Data.csv"
results_1b, total_time_dask_10, total_time_dask_20, total_time_seq_1b = compare_performance(file_path)
Results with sequential processing:
| Trips 1-25 Miles | Trips 1-3 Miles | Trips 10-25 Miles | Trips 100-250 Miles | Trips 100+ Miles | Trips 25-100 Miles | Trips 25-50 Miles | Trips 250-500 Miles | Trips 3-5 Miles | Trips 5-10 Miles | Trips 50-100 Miles | Trips 500+ Miles | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Week | ||||||||||||
| 32 | 1.015555e+09 | 369476657.0 | 231078511.0 | 6850130.0 | 12122473.0 | 88037455.0 | 69159131.0 | 1829242.0 | 181555834.0 | 233444464.0 | 18878323.0 | 3443101.0 |
Total Time: 11.28 seconds Read Time: 11.04 seconds Groupby Time: 0.24 seconds Results with Dask processing (10 processors):
| Trips 1-25 Miles | Trips 1-3 Miles | Trips 10-25 Miles | Trips 100-250 Miles | Trips 100+ Miles | Trips 25-100 Miles | Trips 25-50 Miles | Trips 250-500 Miles | Trips 3-5 Miles | Trips 5-10 Miles | Trips 50-100 Miles | Trips 500+ Miles | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Week | ||||||||||||
| 32 | 1.015555e+09 | 369476657.0 | 231078511.0 | 6850130.0 | 12122473.0 | 88037455.0 | 69159131.0 | 1829242.0 | 181555834.0 | 233444464.0 | 18878323.0 | 3443101.0 |
Total Time: 5.4 seconds Read Time: 0.37 seconds Groupby Time: 5.03 seconds Results with Dask processing (20 processors):
| Trips 1-25 Miles | Trips 1-3 Miles | Trips 10-25 Miles | Trips 100-250 Miles | Trips 100+ Miles | Trips 25-100 Miles | Trips 25-50 Miles | Trips 250-500 Miles | Trips 3-5 Miles | Trips 5-10 Miles | Trips 50-100 Miles | Trips 500+ Miles | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Week | ||||||||||||
| 32 | 1.015555e+09 | 369476657.0 | 231078511.0 | 6850130.0 | 12122473.0 | 88037455.0 | 69159131.0 | 1829242.0 | 181555834.0 | 233444464.0 | 18878323.0 | 3443101.0 |
Total Time: 16.240000000000002 seconds Read Time: 10.66 seconds Groupby Time: 5.58 seconds
In [14]:
melted_data = results_1b.reset_index().melt(id_vars='Week', var_name='Trip', value_name='Average Trips')
# Plot the transposed data
fig = px.bar(melted_data, x='Trip', y='Average Trips', color='Trip',
title='Average Trips per Week for Different Trips',
labels={'Week': 'Week', 'Average Trips': 'Average Trips'})
fig.update_layout(height=600, width=1200)
fig.show()
1 C - Filtering¶
In [15]:
import dask
import dask.dataframe as dd
from time import time
def read_data(file_path):
#import data using Dask
start_read_time = time()
dtypes = {'County Name': 'object', 'State Postal Code': 'object', 'Number of Trips': 'float64',
'Number of Trips 1-3': 'float64',
'Number of Trips 10-25': 'float64',
'Number of Trips 100-250': 'float64',
'Number of Trips 25-50': 'float64',
'Number of Trips 250-500': 'float64',
'Number of Trips 3-5': 'float64',
'Number of Trips 5-10': 'float64',
'Number of Trips 50-100': 'float64',
'Number of Trips <1': 'float64',
'Number of Trips >=500': 'float64',
'Population Not Staying at Home': 'float64',
'Population Staying at Home': 'float64'}
data = dd.read_csv(file_path, dtype=dtypes)
end_read_time = time()
read_time = round(end_read_time - start_read_time, 2)
return data, read_time
def filter_data(data, num_workers=None):
# Select analysis columns
selected_columns = ['Date', 'Number of Trips 10-25', 'Number of Trips 50-100']
data_selected = data[selected_columns]
# Drop rows with missing entries
data_selected = data_selected.dropna()
# Record start time for filtering
start_filter_time = time()
# Filter data for > 100,000,000 people and 10-25 number of trips
filtered_10_25 = data_selected[data_selected['Number of Trips 10-25'] > 100000000]
# Filter data for > 10,000,000 people and 50-100 number of trips
filtered_50_100 = data_selected[data_selected['Number of Trips 50-100'] > 10000000]
# Compute the results using Dask delayed
if num_workers is not None:
dask.config.set(scheduler='synchronous')
# Lazily compute the results using Dask delayed
filtered_10_25 = dask.delayed(filtered_10_25.compute)()
filtered_50_100 = dask.delayed(filtered_50_100.compute)()
# Record end time for filtering
end_filter_time = time()
filter_time = round(end_filter_time - start_filter_time, 2)
return filtered_10_25, filtered_50_100, filter_time
def main():
file_path = "data/Trips_by_Distance.csv"
# Processing sequentially
data_seq, read_time_seq = read_data(file_path)
filtered_10_25_seq, filtered_50_100_seq, filter_time_seq = filter_data(data_seq.compute(scheduler='synchronous'))
total_time_seq = read_time_seq + filter_time_seq
print("\nResults with sequential processing (synchronous scheduler):")
print("\nRead Time (Sequential): {} seconds".format(read_time_seq))
print("Filtering Time (Sequential): {} seconds".format(filter_time_seq))
print("Total Time (Sequential): {} seconds".format(total_time_seq))
# Processing with 10 processors
data_10, read_time_10 = read_data(file_path)
filtered_10_25_10, filtered_50_100_10, filter_time_10 = filter_data(data_10, num_workers=10)
total_time_10 = read_time_10 + filter_time_10
print("\nResults with 10 processors (synchronous scheduler):")
print("\nRead Time (10 processors): {} seconds".format(read_time_10))
print("Filtering Time (10 processors): {} seconds".format(filter_time_10))
print("Total Time (10 processors): {} seconds".format(total_time_10))
# Processing with 20 processors
data_20, read_time_20 = read_data(file_path)
filtered_10_25_20, filtered_50_100_20, filter_time_20 = filter_data(data_20, num_workers=20)
total_time_20 = read_time_20 + filter_time_20
print("\nResults with 20 processors (synchronous scheduler):")
print("\nRead Time (20 processors): {} seconds".format(read_time_20))
print("Filtering Time (20 processors): {} seconds".format(filter_time_20))
print("Total Time (20 processors): {} seconds".format(total_time_20))
# Return DataFrames for visualization
return (
filtered_10_25_seq, filtered_50_100_seq, total_time_seq,
filtered_10_25_10, filtered_50_100_10, total_time_10,
filtered_10_25_20, filtered_50_100_20, total_time_20
)
if __name__ == '__main__':
(
filtered_10_25_seq, filtered_50_100_seq, total_time_seq,
filtered_10_25_10, filtered_50_100_10, total_time_10,
filtered_10_25_20, filtered_50_100_20, total_time_20
) = main()
Results with sequential processing (synchronous scheduler): Read Time (Sequential): 0.05 seconds Filtering Time (Sequential): 0.01 seconds Total Time (Sequential): 0.060000000000000005 seconds Results with 10 processors (synchronous scheduler): Read Time (10 processors): 0.07 seconds Filtering Time (10 processors): 0.01 seconds Total Time (10 processors): 0.08 seconds Results with 20 processors (synchronous scheduler): Read Time (20 processors): 0.06 seconds Filtering Time (20 processors): 0.01 seconds Total Time (20 processors): 0.06999999999999999 seconds
In [16]:
#plot number of participants
compared = filtered_10_25_20.merge(filtered_50_100_20).compute()
In [17]:
fig = px.line(compared, x='Date', y=['Number of Trips 10-25', 'Number of Trips 50-100'],
title='Filtered Trips with > 100,000,000 people and 10-25 or 50-100 Number of Trips')
# Update layout for better visualization
fig.update_layout(xaxis_title='Date', yaxis_title='Number of Trips', legend_title='Trip Categories')
fig.update_layout(legend=dict(
orientation="h",
yanchor="middle",
y=1.02,
xanchor="right",
x=1
))
fig.update_layout(height=600, width=1200)
fig.show()
In [ ]:
Compare Time used for Sequential, 10 Processors, and 20 Processors¶
In [19]:
processors = ["10 Processors Q1.A", '20 Processors Q1.A',"Sequential Q1.A","10 Processors Q1.B",
'20 Processors Q1.B',"Sequential Q1.B", "Sequential Q1.C","10 Processors Q1.C", '20 Processors Q1.C']
processing_time = [total_time_dask_10_1a, total_time_dask_20_1a, total_time_seq_1a,
total_time_dask_10, total_time_dask_20, total_time_seq_1b, total_time_seq, total_time_10, total_time_20]
In [42]:
# Define the processors and processing times
processors = ["10 Processors Q1.A", '20 Processors Q1.A', "Sequential Q1.A", "10 Processors Q1.B",
'20 Processors Q1.B', "Sequential Q1.B", "Sequential Q1.C", "10 Processors Q1.C", '20 Processors Q1.C']
processors_level = ["10 Processors", '20 Processors', "Sequential", "10 Processors",
'20 Processors', "Sequential", "Sequential", "10 Processors", '20 Processors']
processing_time = [total_time_dask_10_1a, total_time_dask_20_1a, total_time_seq_1a,
total_time_dask_10, total_time_dask_20, total_time_seq_1b, total_time_seq, total_time_10, total_time_20]
# Create a DataFrame
df = pd.DataFrame({'Processor': processors,'Level':processors_level, 'Processing Time (seconds)': processing_time})
# show processing times
df
Out[42]:
| Processor | Level | Processing Time (seconds) | |
|---|---|---|---|
| 0 | 10 Processors Q1.A | 10 Processors | 52.90 |
| 1 | 20 Processors Q1.A | 20 Processors | 51.82 |
| 2 | Sequential Q1.A | Sequential | 39.29 |
| 3 | 10 Processors Q1.B | 10 Processors | 5.40 |
| 4 | 20 Processors Q1.B | 20 Processors | 16.24 |
| 5 | Sequential Q1.B | Sequential | 11.28 |
| 6 | Sequential Q1.C | Sequential | 0.06 |
| 7 | 10 Processors Q1.C | 10 Processors | 0.08 |
| 8 | 20 Processors Q1.C | 20 Processors | 0.07 |
In [43]:
#df.to_csv("processing times.csv", index = False)
In [22]:
# Compare mean processing times
mean_times = pd.DataFrame(df.groupby('Level')['Processing Time (seconds)'].mean())
print("\nMean Processing Times:")
mean_times
Mean Processing Times:
Out[22]:
| Processing Time (seconds) | |
|---|---|
| Level | |
| 10 Processors | 19.460000 |
| 20 Processors | 22.710000 |
| Sequential | 16.876667 |
In [23]:
#bar chart for mean processing times
fig = px.bar(mean_times.reset_index(), x='Level', y='Processing Time (seconds)',
title='Mean Processing Times by Processing type',
labels={'Processing Time (seconds)': 'Mean Processing Time (seconds)'})
# Update layout for better visualization
fig.update_layout(xaxis_title='Level', yaxis_title='Mean Processing Time (seconds)')
# Display the figure
fig.show()
1E - Modeling¶
In [24]:
data_distance = dd.read_csv('data/Trips_by_Distance.csv', assume_missing=True, dtype={'County Name': 'object', 'State Postal Code': 'object'})
#subset data for use
selected_columns = [
'Population Not Staying at Home',
'Number of Trips <1', 'Number of Trips 1-3', 'Number of Trips 3-5',
'Number of Trips 5-10', 'Number of Trips 10-25',
'Number of Trips 25-50', 'Number of Trips 50-100',
'Number of Trips 100-250', 'Number of Trips 250-500',
'Number of Trips >=500', 'Week'
]
data_selected = data_distance[selected_columns]
In [25]:
###test for linearity
# Compute Pearson correlation coefficients
correlation_y_x = data_selected.corr(method='pearson').compute()
In [26]:
#heatmap for the correlation matrix
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_y_x, annot=True, cmap='coolwarm', fmt=".2f", linewidths=0.5)
plt.title('Pearson Correlation Coefficients - Linearity Test')
plt.show()
Since all the variables 'Number of Trips <1', 'Number of Trips 1-3', 'Number of Trips 3-5', 'Number of Trips 5-10', 'Number of Trips 10-25', 'Number of Trips 25-50', 'Number of Trips 50-100', 'Number of Trips 100-250', 'Number of Trips 250-500', 'Number of Trips >=500' are highly correlated with each other and with Population Not Staying at Home ( target variable), this will present the problem of multicollinearity if all variables are used in the model. Therefore only the variable "Number of Trips 50-100" which has the highes correlation is used along side week number as predictors.
In [27]:
%%time
#get the variables to use
vars = ['Population Not Staying at Home', 'Number of Trips 50-100', 'Week']
#subset
datax = data_selected[vars]
#drop nan
datax = datax.dropna().compute()
#get x
X = datax[['Number of Trips 50-100', 'Week']]
#get y
y = datax['Population Not Staying at Home']
print("X Shape:", X.shape)
print("\ny Shape:", y.shape)
#obtain x and y values
dask_x = X.values
dask_y = y.values
X Shape: (1035625, 2) y Shape: (1035625,) CPU times: total: 6.62 s Wall time: 11.4 s
In [28]:
# Split the data into training and testing sets (70:30 ratio)
X_train, X_test, y_train, y_test = train_test_split(dask_x, dask_y, test_size=0.3, random_state=42, shuffle = True)
#fit model
lr = LinearRegression(fit_intercept=True)
lr.fit(X_train, y_train)
#make predictions on test data
y_pred = lr.predict(X_test)
#evaluate the model
# Evaluate R-squared and RMSE
r_squared = round(r2_score(y_test, y_pred), 4)
rmse = round(mean_squared_error(y_test, y_pred, squared=False), 4)
print("R-Squared:", r_squared)
print("RMSE:", rmse)
R-Squared: 0.9783 RMSE: 1139104.8422
Increase Training Size¶
In [29]:
# Split the data into training and testing sets (70:30 ratio)
X_train, X_test, y_train, y_test = train_test_split(dask_x, dask_y, test_size=0.1, random_state=42, shuffle = True)
#fit model
lr = LinearRegression(fit_intercept=True)
lr.fit(X_train, y_train)
#make predictions on test data
y_pred = lr.predict(X_test)
#evaluate the model
# Evaluate R-squared and RMSE
r_squared1 = round(r2_score(y_test, y_pred), 4)
rmse1 = round(mean_squared_error(y_test, y_pred, squared=False), 4)
print("R-Squared:", r_squared1)
print("RMSE:", rmse1)
R-Squared: 0.9754 RMSE: 1201830.882
In [ ]:
In [30]:
# Split the data into training and testing sets (70:30 ratio)
X_train, X_test, y_train, y_test = train_test_split(dask_x, dask_y, test_size=0.2, random_state=42, shuffle = True)
#fit model
lr = LinearRegression(fit_intercept=True)
lr.fit(X_train, y_train)
#make predictions on test data
y_pred = lr.predict(X_test)
#evaluate the model
# Evaluate R-squared and RMSE
r_squared2 = round(r2_score(y_test, y_pred), 4)
rmse2 = round(mean_squared_error(y_test, y_pred, squared=False), 4)
print("R-Squared:", r_squared2)
print("RMSE:", rmse2)
R-Squared: 0.9759 RMSE: 1192543.3382
Reduce the training size¶
In [31]:
# Split the data into training and testing sets (70:30 ratio)
X_train, X_test, y_train, y_test = train_test_split(dask_x, dask_y, test_size=0.4, random_state=42, shuffle = True)
#fit model
lr = LinearRegression(fit_intercept=True)
lr.fit(X_train, y_train)
#make predictions on test data
y_pred = lr.predict(X_test)
#evaluate the model
# Evaluate R-squared and RMSE
r_squared3 = round(r2_score(y_test, y_pred), 4)
rmse3 = round(mean_squared_error(y_test, y_pred, squared=False), 4)
print("R-Squared:", r_squared3)
print("RMSE:", rmse3)
R-Squared: 0.9783 RMSE: 1114474.4401
Plot actual vs predicted¶
In [32]:
# Convert Dask DataFrames to Pandas DataFrames for plotting
y_test_pd = y_test
y_pred_pd = y_pred
# Plot actual vs. predicted values
plt.scatter(y_test_pd, y_pred_pd, alpha=0.5)
plt.title('Actual vs. Predicted Values')
plt.xlabel('Actual Values')
plt.ylabel('Predicted Values')
plt.grid()
plt.show()
Compare performance by data size¶
In [33]:
#data size
data_size = ['60%', '70%', '80%','90%']
r_squareds = [r_squared, r_squared1, r_squared2, r_squared3]
#rmses
rmses = [rmse, rmse1, rmse2, rmse3]
#dataframe
df = pd.DataFrame({
'Data Size': data_size,
'R-Squared': r_squareds,
'RMSE': [round(item, 2) for item in rmses]
})
# Sort DataFrame by RMSE
df.sort_values(by='RMSE')
Out[33]:
| Data Size | R-Squared | RMSE | |
|---|---|---|---|
| 3 | 90% | 0.9783 | 1114474.44 |
| 0 | 60% | 0.9783 | 1139104.84 |
| 2 | 80% | 0.9759 | 1192543.34 |
| 1 | 70% | 0.9754 | 1201830.88 |
In [34]:
#df.sort_values(by='RMSE').to_csv("performance.csv", index = False)
1F - Visualization¶
In [35]:
#read and process data
full_data, time_taken = read_data("data/Trips_Full Data.csv")
#confirm data is processed with no missing values
missing_full = full_data.isnull().sum().compute()
missing_full
Out[35]:
Month of Date 0 Week of Date 0 Year of Date 0 Level 0 Date 0 Week Ending Date 0 Trips <1 Mile 0 People Not Staying at Home 0 Population Staying at Home 0 Trips 0 Trips 1-25 Miles 0 Trips 1-3 Miles 0 Trips 10-25 Miles 0 Trips 100-250 Miles 0 Trips 100+ Miles 0 Trips 25-100 Miles 0 Trips 25-50 Miles 0 Trips 250-500 Miles 0 Trips 3-5 Miles 0 Trips 5-10 Miles 0 Trips 50-100 Miles 0 Trips 500+ Miles 0 dtype: int64
Visualize to check for distribution¶
In [36]:
boxplot_variables = ['People Not Staying at Home', 'Population Staying at Home',
]
# Select the desired variables for the box plots
boxplot_data = full_data.compute()[boxplot_variables]
# Melt the DataFrame to create long-form data suitable for box plots
melted_data = pd.melt(boxplot_data, var_name='Variable', value_name='Value')
# Create box plots using Plotly Express
fig = px.box(melted_data, x='Variable', y='Value', points="all", title='Box Plots for Trip-related Variables')
# Update layout for better visualization
fig.update_layout(xaxis_title='Variable', yaxis_title='Value')
# Show the figure
fig.show()
In [37]:
fig = px.line(full_data.compute(), x='Date', y='Trips <1 Mile', title='Number of participants making trips with < Mile Over Time')
# Update layout for better visualization
fig.update_layout(xaxis_title='Date', yaxis_title='Trips')
# Display the figure
fig.show()
In [38]:
boxplot_variables = ['Trips <1 Mile', 'Trips 1-25 Miles', 'Trips 1-3 Miles','Trips 10-25 Miles','Trips 3-5 Miles',
'Trips 5-10 Miles','Trips 25-50 Miles'
]
# Select the desired variables for the box plots
boxplot_data = full_data.compute()[boxplot_variables]
# Melt the DataFrame to create long-form data suitable for box plots
melted_data = pd.melt(boxplot_data, var_name='Variable', value_name='Value')
# Create box plots using Plotly Express
fig = px.box(melted_data, x='Variable', y='Value', points="all", title='Box Plots for Short Trips')
# Update layout for better visualization
fig.update_layout(xaxis_title='Variable', yaxis_title='Value')
# Show the figure
fig.show()
In [40]:
boxplot_variables = ['Trips 100-250 Miles', 'Trips 100+ Miles', 'Trips 25-100 Miles',
'Trips 250-500 Miles', 'Trips 50-100 Miles', 'Trips 500+ Miles'
]
# Select the desired variables for the box plots
boxplot_data = full_data.compute()[boxplot_variables]
# Melt the DataFrame to create long-form data suitable for box plots
melted_data = pd.melt(boxplot_data, var_name='Variable', value_name='Value')
# Create box plots using Plotly Express
fig = px.box(melted_data, x='Variable', y='Value', points="all", title='Box Plots for Long Trips')
# Update layout for better visualization
fig.update_layout(xaxis_title='Variable', yaxis_title='Value')
# Show the figure
fig.show()